在這個案例模板裡,加入種子資料只是為了模擬一般資料處理管道的資料源,所以我只簡單帶過。除了上一篇文章提到的dbt_project.yml
裡的設定:
seeds:
+schema: SALESFORCE
要注意的是相對應的/seeds
目錄裡面的幾個.csv
檔案:
- accounts.csv
- contacts.csv
- leads.csv
- oppertunities.csv
還有資料源的設定檔:
/models/psa/salesforce_sources.yml
sources:
- name: salesforce
description: sample salesforce data
database: DBT_DATA_VAULT_TEMPLATE
schema: DBT_SALESFORCE
tables:
- name: accounts
...
- name: contacts
...
- name: leads
...
- name: oppurtunities
...
雖然各個.csv
檔已經通過上次執行的dbt seed
指令加入到資料庫裡,你還是需要通過定義相對應各個檔案的sources
才能通過dbt來執行。
在一般情況下,種子資料功能其實很少會被用在創建資料源。比較常見的是用在維持一些很少改變的維度或變數的映射表(mapping table),例如ISO國碼、幣值等。
上篇提到,Persistent Staging Area直譯是持久暫存區,在DV資料處理管道內起到幾個比較特別的作用:
資料格式整理與統一
由於資料源通常是直接從原系統內直接導入的,在資料類型、格式、命名上都很容易有偏差。為了簡化實際DV設定的邏輯,通常都會把這一類的問題事先處理完成。這樣,各個DV資料模型就不會需要太多的客製化或差異化的處理邏輯
資料緩存、備份
相對應PSA裡的Persistent,上篇提到這層處理的物化設定是:
+materialized: table
雖然在不同的資料庫裡table
的定義稍微不同,但簡單來說就是在資料處理運行時用到create table as ...
。這樣代表這層的資料不會即時的隨著資料源變化,而可以做到備份的作用。在更大型的資料源庫上,PSA的設計也會複雜化,變成incremental
。
由於主要是對應資料源設計,這裡的資料模型代碼就不會非常複雜。這裡就以其中一個檔案做案例:
models/psa/psa_salesforce_opportunities.sql
with source as (
select * from {{ source('salesforce', 'opportunities') }}
),
renamed as (
select
BATCHID,
OPPURTUNITYID as OPPORTUNITYID,
COMPANEXTID as ACCOUNTID,
to_decimal(replace(replace(AMOUNT,'$',''),',',''),9,2) as AMOUNT,
PROJECT_NAME,
OPPURTUNITY_NAME as OPPORTUNITY_NAME,
STAGE,
CLOSE_DATE,
DATECREATED,
MODIFIEDDATE
from source
)
select * from renamed
除了基本的source定義以外,可以看出這裡邏輯主要是為了剛才提到的資料格式整理與統一。主要只是為了簡單的表列重命名、排錯:
...
OPPURTUNITYID as OPPORTUNITYID,
COMPANEXTID as ACCOUNTID,
...
OPPURTUNITY_NAME as OPPORTUNITY_NAME,
...
和格式排錯與單一化:
...
to_decimal(replace(replace(AMOUNT,'$',''),',',''),9,2) as AMOUNT,
...
對SQL如果不是很熟悉的話,這行看起來可能有點複雜,但其實只是將美化過的的貨幣字串值轉換為小數格式。